{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 05: Inverted Index - Beyond Word Count\n",
    "\n",
    "The fifth example is in two-parts. The first part simulates a web crawler that builds an index of documents to words, the first step for computing the *inverted index* used by search engines, from words to documents. The documents \"crawled\" are sample emails from the infamous [Enron email dataset](http://www.aueb.gr/users/ion/data/enron-spam/), each of which has been previously classified already as SPAM or HAM.\n",
    "\n",
    "See the corresponding Spark jobs [Crawl5a.scala](https://github.com/deanwampler/spark-scala-tutorial/blob/master/src/main/scala/sparktutorial/Crawl5a.scala) and [InvertedIndex5b.scala](https://github.com/deanwampler/spark-scala-tutorial/blob/master/src/main/scala/sparktutorial/InvertedIndex5b.scala)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Crawl Data\n",
    "\n",
    "The \"crawl\" step uses a convenient `SparkContext` method called `wholeTextFiles`, which is given a directory \"glob\". The default we use is `../data/enron-spam-ham/*`, which expands to `../data/enron-spam-ham/ham100` and `../data/enron-spam-ham/spam100`. \n",
    "\n",
    "This method returns records of the form `(file_name, file_contents)`, where the `file_name` is the absolute path to a file found in one of the directories, and `file_contents` contains its contents, including nested linefeeds. To make it easier to read the data, we'll strip off the leading path elements in the file name (not normally recommended) and remove the embedded linefeeds, so that each final record is on a single line.\n",
    "\n",
    "Here is an example line from the output:\n",
    "\n",
    "```scala\n",
    "(0038.2001-08-05.SA_and_HP.spam.txt,  Subject: free foreign currency newsletter ...)\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "in = ../data/enron-spam-ham/*\n",
       "out = output/crawl\n",
       "separator = /\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "/"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val in  = \"../data/enron-spam-ham/*\"    // Note the \"*\"\n",
    "val out = \"output/crawl\"                // Used for the inverted index calculation below\n",
    "\n",
    "val separator = java.io.File.separator  // e.g., \"/\" on *NIX systems"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now process the data. After loading with `SparkContext.wholeTextFiles`, we post process the data in two ways. First, we remove the directory prefix from the file name in the \"key\" position of the two-element tuple, to make the data easier to read, as discussed above. Second, the file contents (in the \"value\" position) still contains linefeeds. We remove those, so that the _inverted index_ calculcation below can treat each line as a complete record."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "files_contents = ../data/enron-spam-ham/* MapPartitionsRDD[1] at wholeTextFiles at <console>:29\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "../data/enron-spam-ham/* MapPartitionsRDD[1] at wholeTextFiles at <console>:29"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val files_contents = sc.wholeTextFiles(in)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "crawled = MapPartitionsRDD[2] at map at <console>:32\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "MapPartitionsRDD[2] at map at <console>:32"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val crawled = files_contents.map{\n",
    "  case (path, text) =>          // use pattern matching to decompose the tuple into its parts\n",
    "    val lastSep = path.lastIndexOf(separator)\n",
    "    val path2 = \n",
    "      if (lastSep < 0) path.trim \n",
    "      else path.substring(lastSep+1, path.length).trim  // remove dir prefix\n",
    "    val text2 = text.trim.replaceAll(\"\"\"\\s*\\n\\s*\"\"\", \" \")  // remove extra whitespace and \"\\n\"\n",
    "    (path2, text2)                // return RDD[(String, String)]\n",
    "}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Save the output. Don't forget to delete the output directory if you run this more than once!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "crawled.saveAsTextFile(out)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Inverted Index Calculation\n",
    "\n",
    "Now that we have our \"crawl\" data, let's compute the inverted index for it.\n",
    "\n",
    "First, define the output location for these results and a regular expression used to parse the crawl data output, which has the form `(path,text)`. So we have to eliminate the outer `(...)` and the `,`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "iiout = output/inverted-index\n",
       "lineRE = ^\\s*\\(([^,]+),(.*)\\)\\s*$\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "^\\s*\\(([^,]+),(.*)\\)\\s*$"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val iiout = \"output/inverted-index\"\n",
    "val lineRE = \"\"\"^\\s*\\(([^,]+),(.*)\\)\\s*$\"\"\".r"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Parse the input lines using the regex. If that fails, return an empty tuple `(\"\", \"\")`, which can be filtered out later.\n",
    "\n",
    "> **Note:** Rather than read the crawl data from the file system, we could just use the in-memory `RDD` above, but this approach is more realistic for real-world use."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "ii_input = MapPartitionsRDD[6] at map at <console>:31\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "MapPartitionsRDD[6] at map at <console>:31"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val ii_input = sc.textFile(out).map {       // crawl data written to \"out\" above\n",
    "  case lineRE(name, text) => (name.trim, text.toLowerCase)  // success!\n",
    "  case badLine =>\n",
    "    Console.err.println(s\"Unexpected line: $badLine\")\n",
    "    (\"\", \"\")        // If any of these were returned, you could filter them out below.\n",
    "}                   // RDD[(String,String)] of (path,text) pairs"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The steps in this very long expression:\n",
    "\n",
    "1. Split on non-alphabetical sequences of character as before. But this time, return `(word, path)` pair instead of just the words.\n",
    "2. Rather than `map` to `(word, 1)` tuples as before, use `((word, path), 1)`, where `(word, path)` is a \"key\".\n",
    "3. Count the unique occurrences of those keys (pairs).\n",
    "4. My favorite line is the next `map`: pattern match on the nested tuple to extract the three elements and reconstruct as new tuples with the `word` as key. Note how readible and obvious pattern matching makes this expression!!\n",
    "5. Group over the words, creating records: `RDD[(String, Iterable(String,Int)]]`, e.g., `(foo, Seq((path1, n1), (path2, n2), (path3, n3), ...))`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "ii = ShuffledRDD[11] at groupByKey at <console>:48\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "ShuffledRDD[11] at groupByKey at <console>:48"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val ii = ii_input.\n",
    "  flatMap {\n",
    "    case (path, text) =>\n",
    "      // If we don't trim leading whitespace, the regex split creates an undesired leading \"\" word!\n",
    "      text.trim.split(\"\"\"[^\\p{IsAlphabetic}]+\"\"\").map(word => (word, path))\n",
    "  }.               // RDD[(String,String)] of (word,path) pairs\n",
    "  map {\n",
    "    case (word, path) => ((word, path), 1)\n",
    "  }.               // RDD[((String,String),Int)] of ((word,path),1) pairs\n",
    "  reduceByKey {    // Count the equal (word, path) pairs, as before\n",
    "    (count1, count2) => count1 + count2\n",
    "  }.               // RDD[((String,String),Int)], now with unique (word,path) and int value >= 1\n",
    "  map {            // Rearrange the tuples; word is now the key we want.\n",
    "    case ((word, path), n) => (word, (path, n))\n",
    "  }.               // RDD[(String,(String,Int))]\n",
    "  groupByKey       // There is a also a more general groupBy\n",
    "                   // RDD[(String, Iterable[(String,Int)]]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Finally, convert those iterables to strings for output and write the results."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "// Make a sequence, \"(path1, n1), (path2, n2), (path3, n3), ...\"\n",
    "ii.mapValues(iterator => iterator.mkString(\", \")).\n",
    "   saveAsTextFile(iiout)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Recap\n",
    "\n",
    "It's worth studying this sequence of transformations carefully to understand how it works. Many problems can be solved with these techniques. You might try reading a smaller input file (say the first 5 lines of the crawl output), then hack on the script to dump the `RDD` after each step.\n",
    "\n",
    "A few useful [RDD](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD) methods for exploration include `RDD.sample` or `RDD.take`, to select a subset of elements. Use `RDD.saveAsTextFile` to write to a file or use `RDD.collect` to convert the RDD data into a \"regular\" Scala collection back in the notebook.\n",
    "\n",
    "> **Tip:** This job is the most complex we'll do (outside SQL queries), so it's worth exploring the Job UI, <a href =\"https://localhost.4040\" target=\"ii_spark\">localhost:4040</a>, but note that the port may be different if you have other notebooks running concurrently!"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Exercises\n",
    "\n",
    "There are lots of ways to enhance this code. The exercises that extend the implementation build on each other, so try implementing them all. Recall that exercise solutions are in the project's `src/main/scala/sparktutorial/solns` directory."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Exercise 1: Sort by words\n",
    "\n",
    "Try ascending and descending. How much overhead does this add?"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Exercise 2: Sort the (path,n) tuples\n",
    "\n",
    "Within each record, use the Scala collections API to sort the `Iterable` that holds the `(path,n)` pairs by count `n`. Recall that you would want a real search index to show you the documents first that have a lot to say about the subject. Hint, convert to `Vector` first."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Exercise 3: Remove \"stop words\"\n",
    "\n",
    "Really common words, so called \"stop words\", like \"a\", \"an\", \"the\", etc. are pervasive. They don't convey a lot of information, so you normally want to remove them. One way to improve this is to filter out so-called \"stop\" words that aren't useful for the index. \n",
    "\n",
    "**Hint:** Google for \"stop words\" and find a useful list of them. Use a Scala [Set](http://www.scala-lang.org/api/current/scala/collection/Set.html) to hold the words, then add an `RDD.filter` step to remove occurrences of words in the set.\n",
    "\n",
    "1. Why is a `Set` a good choice for this data structure?\n",
    "2. When would it be better to use a separate `RDD` or `Dataframe` for this data?"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Exercise 4: Implement Term Frequence - Inverse Document Frequency\n",
    "\n",
    "This is a more sophisticated algorithm for dealing with issues like \"stop words\". Look up TF-IDF and try implementing it. Note that Spark already has a built-in library for this algorithm."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Exercise 5: Improve performance\n",
    "\n",
    "Try improving performance, although it can be hard to measure on such a small data set. Try reording some steps, combining others, using different API methods. Try to develop an intuition for when some things are more efficient than others. For example, removing data that you don't want sooner rather than later is generally a good idea. (For example, when should you filter stop words for optimal performance?) Use the <a href=\"http://localhost:4040\" target=\"ii_job\">localhost:4040</a> (Job console) to understand the performance impact of various choices. Make sure you don't break the output, but ask yourself what variations in the results are \"tolerable\"."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Apache Toree - Scala",
   "language": "scala",
   "name": "apache_toree_scala"
  },
  "language_info": {
   "codemirror_mode": "text/x-scala",
   "file_extension": ".scala",
   "mimetype": "text/x-scala",
   "name": "scala",
   "pygments_lexer": "scala",
   "version": "2.11.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
